home *** CD-ROM | disk | FTP | other *** search
/ Apple Developer Connection Student Program / ADC Tools Sampler CD Disk 3 1999.iso / Metrowerks CodeWarrior / Java Support / Java_Source / Java2 / src / java / io / PipedInputStream.java < prev    next >
Encoding:
Java Source  |  1999-05-28  |  10.2 KB  |  333 lines  |  [TEXT/CWIE]

  1. /*
  2.  * @(#)PipedInputStream.java    1.28 98/07/07
  3.  *
  4.  * Copyright 1995-1998 by Sun Microsystems, Inc.,
  5.  * 901 San Antonio Road, Palo Alto, California, 94303, U.S.A.
  6.  * All rights reserved.
  7.  *
  8.  * This software is the confidential and proprietary information
  9.  * of Sun Microsystems, Inc. ("Confidential Information").  You
  10.  * shall not disclose such Confidential Information and shall use
  11.  * it only in accordance with the terms of the license agreement
  12.  * you entered into with Sun.
  13.  */
  14.  
  15. package java.io;
  16.  
  17. /**
  18.  * A piped input stream should be connected
  19.  * to a piped output stream; the piped  input
  20.  * stream then provides whatever data bytes
  21.  * are written to the piped output  stream.
  22.  * Typically, data is read from a <code>PipedInputStream</code>
  23.  * object by one thread  and data is written
  24.  * to the corresponding <code>PipedOutputStream</code>
  25.  * by some  other thread. Attempting to use
  26.  * both objects from a single thread is not
  27.  * recommended, as it may deadlock the thread.
  28.  * The piped input stream contains a buffer,
  29.  * decoupling read operations from write operations,
  30.  * within limits.
  31.  *
  32.  * @author  James Gosling
  33.  * @version 1.28, 07/07/98
  34.  * @see     java.io.PipedOutputStream
  35.  * @since   JDK1.0
  36.  */
  37. public
  38. class PipedInputStream extends InputStream {
  39.     boolean closedByWriter = false;
  40.     boolean closedByReader = false;
  41.     boolean connected = false;
  42.  
  43.     /* REMIND: identification of the read and write sides needs to be
  44.        more sophisticated.  Either using thread groups (but what about
  45.        pipes within a thread?) or using finalization (but it may be a
  46.        long time until the next GC). */
  47.     Thread readSide;
  48.     Thread writeSide;
  49.  
  50.     /**
  51.      * The size of the pipe's circular input buffer.
  52.      * @since   JDK1.1
  53.      */
  54.     protected static final int PIPE_SIZE = 1024;
  55.  
  56.     /**
  57.      * The circular buffer into which incoming data is placed.
  58.      * @since   JDK1.1
  59.      */
  60.     protected byte buffer[] = new byte[PIPE_SIZE];
  61.  
  62.     /**
  63.      * The index of the position in the circular buffer at which the
  64.      * next byte of data will be stored when received from the connected
  65.      * piped output stream. <code>in<0</code> implies the buffer is empty,
  66.      * <code>in==out</code> implies the buffer is full
  67.      * @since   JDK1.1
  68.      */
  69.     protected int in = -1;
  70.  
  71.     /**
  72.      * The index of the position in the circular buffer at which the next
  73.      * byte of data will be read by this piped input stream.
  74.      * @since   JDK1.1
  75.      */
  76.     protected int out = 0;
  77.  
  78.     /**
  79.      * Creates a <code>PipedInputStream</code> so
  80.      * that it is connected to the piped output
  81.      * stream <code>src</code>. Data bytes written
  82.      * to <code>src</code> will then be  available
  83.      * as input from this stream.
  84.      *
  85.      * @param      src   the stream to connect to.
  86.      * @exception  IOException  if an I/O error occurs.
  87.      */
  88.     public PipedInputStream(PipedOutputStream src) throws IOException {
  89.     connect(src);
  90.     }
  91.  
  92.     /**
  93.      * Creates a <code>PipedInputStream</code> so
  94.      * that it is not  yet connected. It must be
  95.      * connected to a <code>PipedOutputStream</code>
  96.      * before being used.
  97.      *
  98.      * @see     java.io.PipedInputStream#connect(java.io.PipedOutputStream)
  99.      * @see     java.io.PipedOutputStream#connect(java.io.PipedInputStream)
  100.      */
  101.     public PipedInputStream() {
  102.     }
  103.  
  104.     /**
  105.      * Causes this piped input stream to be connected
  106.      * to the piped  output stream <code>src</code>.
  107.      * If this object is already connected to some
  108.      * other piped output  stream, an <code>IOException</code>
  109.      * is thrown.
  110.      * <p>
  111.      * If <code>src</code> is an
  112.      * unconnected piped output stream and <code>snk</code>
  113.      * is an unconnected piped input stream, they
  114.      * may be connected by either the call:
  115.      * <p>
  116.      * <pre><code>snk.connect(src)</code> </pre>
  117.      * <p>
  118.      * or the call:
  119.      * <p>
  120.      * <pre><code>src.connect(snk)</code> </pre>
  121.      * <p>
  122.      * The two
  123.      * calls have the same effect.
  124.      *
  125.      * @param      src   The piped output stream to connect to.
  126.      * @exception  IOException  if an I/O error occurs.
  127.      */
  128.     public void connect(PipedOutputStream src) throws IOException {
  129.     src.connect(this);
  130.     }
  131.  
  132.     /**
  133.      * Receives a byte of data.  This method will block if no input is
  134.      * available.
  135.      * @param b the byte being received
  136.      * @exception IOException If the pipe is broken.
  137.      * @since     JDK1.1
  138.      */
  139.     protected synchronized void receive(int b) throws IOException {
  140.         if (!connected) {
  141.             throw new IOException("Pipe not connected");
  142.         } else if (closedByWriter || closedByReader) {
  143.         throw new IOException("Pipe closed");
  144.     } else if (readSide != null && !readSide.isAlive()) {
  145.             throw new IOException("Read end dead");
  146.         }
  147.  
  148.     writeSide = Thread.currentThread();
  149.     while (in == out) {
  150.         if ((readSide != null) && !readSide.isAlive()) {
  151.         throw new IOException("Pipe broken");
  152.         }
  153.         /* full: kick any waiting readers */
  154.         notifyAll();
  155.         try {
  156.             wait(1000);
  157.         } catch (InterruptedException ex) {
  158.         throw new java.io.InterruptedIOException();
  159.         }
  160.     }
  161.     if (in < 0) {
  162.         in = 0;
  163.         out = 0;
  164.     }
  165.     buffer[in++] = (byte)(b & 0xFF);
  166.     if (in >= buffer.length) {
  167.         in = 0;
  168.     }
  169.     }
  170.  
  171.     /**
  172.      * Receives data into an array of bytes.  This method will
  173.      * block until some input is available.
  174.      * @param b the buffer into which the data is received
  175.      * @param off the start offset of the data
  176.      * @param len the maximum number of bytes received
  177.      * @return the actual number of bytes received, -1 is
  178.      *          returned when the end of the stream is reached.
  179.      * @exception IOException If an I/O error has occurred.
  180.      */
  181.     synchronized void receive(byte b[], int off, int len)  throws IOException {
  182.     while (--len >= 0) {
  183.         receive(b[off++]);
  184.     }
  185.     }
  186.  
  187.     /**
  188.      * Notifies all waiting threads that the last byte of data has been
  189.      * received.
  190.      */
  191.     synchronized void receivedLast() {
  192.     closedByWriter = true;
  193.     notifyAll();
  194.     }
  195.  
  196.     /**
  197.      * Reads the next byte of data from this piped input stream. The
  198.      * value byte is returned as an <code>int</code> in the range
  199.      * <code>0</code> to <code>255</code>. If no byte is available
  200.      * because the end of the stream has been reached, the value
  201.      * <code>-1</code> is returned. This method blocks until input data
  202.      * is available, the end of the stream is detected, or an exception
  203.      * is thrown.
  204.      * If a thread was providing data bytes
  205.      * to the connected piped output stream, but
  206.      * the  thread is no longer alive, then an
  207.      * <code>IOException</code> is thrown.
  208.      *
  209.      * @return     the next byte of data, or <code>-1</code> if the end of the
  210.      *             stream is reached.
  211.      * @exception  IOException  if the pipe is broken.
  212.      */
  213.     public synchronized int read()  throws IOException {
  214.         if (!connected) {
  215.             throw new IOException("Pipe not connected");
  216.         } else if (closedByReader) {
  217.         throw new IOException("Pipe closed");
  218.     } else if (writeSide != null && !writeSide.isAlive()
  219.                    && !closedByWriter && (in < 0)) {
  220.             throw new IOException("Write end dead");
  221.         }
  222.  
  223.         readSide = Thread.currentThread();
  224.     int trials = 2;
  225.     while (in < 0) {
  226.         if (closedByWriter) {
  227.         /* closed by writer, return EOF */
  228.         return -1;
  229.         }
  230.         if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  231.         throw new IOException("Pipe broken");
  232.         }
  233.             /* might be a writer waiting */
  234.         notifyAll();
  235.         try {
  236.             wait(1000);
  237.         } catch (InterruptedException ex) {
  238.         throw new java.io.InterruptedIOException();
  239.         }
  240.      }
  241.     int ret = buffer[out++] & 0xFF;
  242.     if (out >= buffer.length) {
  243.         out = 0;
  244.     }
  245.     if (in == out) {
  246.             /* now empty */
  247.         in = -1;
  248.     }
  249.     return ret;
  250.     }
  251.  
  252.     /**
  253.      * Reads up to <code>len</code> bytes of data from this piped input
  254.      * stream into an array of bytes. Less than <code>len</code> bytes
  255.      * will be read if the end of the data stream is reached. This method
  256.      * blocks until at least one byte of input is available.
  257.      * If a thread was providing data bytes
  258.      * to the connected piped output stream, but
  259.      * the  thread is no longer alive, then an
  260.      * <code>IOException</code> is thrown.
  261.      *
  262.      * @param      b     the buffer into which the data is read.
  263.      * @param      off   the start offset of the data.
  264.      * @param      len   the maximum number of bytes read.
  265.      * @return     the total number of bytes read into the buffer, or
  266.      *             <code>-1</code> if there is no more data because the end of
  267.      *             the stream has been reached.
  268.      * @exception  IOException  if an I/O error occurs.
  269.      */
  270.     public synchronized int read(byte b[], int off, int len)  throws IOException {
  271.     if (b == null) {
  272.         throw new NullPointerException();
  273.     } else if ((off < 0) || (off > b.length) || (len < 0) ||
  274.            ((off + len) > b.length) || ((off + len) < 0)) {
  275.         throw new IndexOutOfBoundsException();
  276.     } else if (len == 0) {
  277.         return 0;
  278.     }
  279.  
  280.         /* possibly wait on the first character */
  281.     int c = read();
  282.     if (c < 0) {
  283.         return -1;
  284.     }
  285.     b[off] = (byte) c;
  286.     int rlen = 1;
  287.     while ((in >= 0) && (--len > 0)) {
  288.         b[off + rlen] = buffer[out++];
  289.         rlen++;
  290.         if (out >= buffer.length) {
  291.         out = 0;
  292.         }
  293.         if (in == out) {
  294.                 /* now empty */
  295.         in = -1;
  296.         }
  297.     }
  298.     return rlen;
  299.     }
  300.  
  301.     /**
  302.      * Returns the number of bytes that can be read from this input
  303.      * stream without blocking. This method overrides the <code>available</code>
  304.      * method of the parent class.
  305.      *
  306.      * @return     the number of bytes that can be read from this input stream
  307.      *             without blocking.
  308.      * @exception  IOException  if an I/O error occurs.
  309.      * @since   JDK1.0.2
  310.      */
  311.   public synchronized int available() throws IOException {
  312.     if(in < 0)
  313.       return 0;
  314.     else if(in == out)
  315.       return buffer.length;
  316.     else if (in > out)
  317.       return in - out;
  318.     else
  319.       return in + buffer.length - out;
  320.   }
  321.  
  322.     /**
  323.      * Closes this piped input stream and releases any system resources
  324.      * associated with the stream.
  325.      *
  326.      * @exception  IOException  if an I/O error occurs.
  327.      */
  328.     public void close()  throws IOException {
  329.     in = -1;
  330.     closedByReader = true;
  331.     }
  332. }
  333.